package com.bc.plugin.redis.hibernate.regions;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.StringUtils;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.spi.Region;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.support.atomic.RedisAtomicLong;

import com.bc.plugin.redis.hibernate.config.RedisCacheFactoryAutoConfiguration;
import com.bc.plugin.redis.hibernate.help.RedisTimestamp;

import lombok.NonNull;

public abstract class RedisDataRegion implements Region {
	protected final Logger log = LoggerFactory.getLogger(this.getClass());
	/**
	 * Region regionName
	 */
	private final String regionName;

	private final int expiryInSeconds; // seconds
	static final int DEFAULT_EXPIRY_IN_SECONDS = 1800;
	private Random random;

	public RedisDataRegion(@NonNull String regionName, Properties props) {
		this.regionName = StringUtils.replace(regionName, ".", ":") + ":";
		this.expiryInSeconds = DEFAULT_EXPIRY_IN_SECONDS;
		random = new Random();
		redisTimestamp = new RedisAtomicLong(getName() + "system:nanoTime",
				RedisCacheFactoryAutoConfiguration.TIMESTAMP_REDIS_TEMPLATE);
		log.debug("redis region={}, expiryInSeconds={}", regionName, expiryInSeconds);
	}

	/**
	 * Region regionName
	 *
	 * @return region regionName
	 */
	@Override
	public String getName() {
		return regionName;
	}

	/**
	 * delete region
	 */
	@Override
	public void destroy() throws CacheException {
		// NOTE: No need to delete region in HA mode
	}

	/**
	 * confirm the specified key exists in current region
	 *
	 * @param key cache key
	 * @return if cache key is exists in current region return true, else return
	 *         false
	 */
	@Override
	public boolean contains(Object key) {
		if (getRedisTemplate() == null) {
			return false;
		}
		try {
			log.debug("contains key={}", key);
			return getRedisTemplate().hasKey(getkey(key));
		} catch (Exception ignored) {
			log.warn("Fail to exists key. key=" + key, ignored);
			return false;
		}
	}

	@Override
	public long getSizeInMemory() {
		return getGroupKeys().size();
	}

	@Override
	public long getElementCountInMemory() {
		return getGroupKeys().size();
	}

	@Override
	public long getElementCountOnDisk() {
		return -1;
	}

	@Override
	public Map<String, Object> toMap() {
		Map<String, Object> map = new HashMap<>();
		getGroupKeys().forEach(key -> {
			map.put(key, get(key));
		});
		return map;
	}

	RedisAtomicLong redisTimestamp;

	@Override
	public long nextTimestamp() {
		return RedisTimestamp.nextTimestamp(redisTimestamp);
	}

	@Override
	public int getTimeout() {
		return random.nextInt(10) + expiryInSeconds;
	}

	public String getkey(Object keyObj) {
		String key = keyObj.toString();
		if (key.contains("#")) {
			return getName() + key.substring(key.indexOf("#"));
		}
		return getName() + key;
	}

	AtomicBoolean atomicBoolean = new AtomicBoolean(false);

	public Object get(@NonNull Object key) {
		try {
			BoundValueOperations<String, Object> operations = getRedisTemplate().boundValueOps(getkey(key));
			if (atomicBoolean.compareAndSet(false, true)) {
				try {
					operations.expire(getTimeout(), TimeUnit.SECONDS);
				} finally {
					atomicBoolean.set(false);
				}
			}
			return operations.get();
		} catch (Exception ignored) {
			log.warn("Fail to get cache item... key=" + key, ignored);
			return null;
		}
	}

	public void put(@NonNull Object key, @NonNull Object value) {
		try {
			getRedisTemplate().opsForValue().set(getkey(key), value, getTimeout(), TimeUnit.SECONDS);
		} catch (Exception ignored) {
			log.warn("Fail to put cache item... key=" + key, ignored);
		}
	}

	public void remove(@NonNull Object key) throws CacheException {
		try {
			getRedisTemplate().delete(getkey(key));
		} catch (Exception ignored) {
			log.warn("Fail to remove cache item... key=" + key, ignored);
		}
	}

	public void clear() {
		try {
			getRedisTemplate().delete(getGroupKeys());
		} catch (Exception ignored) {
			log.warn("Fail to clear region... name=" + getName(), ignored);
		}
	}

	public Set<String> getGroupKeys() {
		return getRedisTemplate().keys(getName() + "*");
	}

	public RedisTemplate<String, Object> getRedisTemplate() {
		return RedisCacheFactoryAutoConfiguration.REDIS_TEMPLATE;
	}
}
